package com.atguigu.day10;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkSQL05_Sink_File {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.读取端口数据
        DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop102", 9999);
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = socketTextStream.map(line -> {
            String[] fields = line.split(",");
            return new WaterSensor(fields[0],
                    Long.parseLong(fields[1]),
                    Double.parseDouble(fields[2]));
        });

        //3.创建文本数据连接器
        tableEnv.connect(new FileSystem()
                .path("input/sensor1.txt"))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("ts", DataTypes.BIGINT())
                        .field("vc", DataTypes.DOUBLE()))
                .createTemporaryTable("sensor");

        //4.使用TableAPI
        Table sensorTable = tableEnv.fromDataStream(waterSensorDS);

        //5.过滤数据并查询数据
        Table resultTable = sensorTable
                .where($("id").isEqual("ws_001"))
                .select($("id"), $("ts"), $("vc"));

        //报错,当前数据会对已经进入的数据做修改
//        Table resultTable = sensorTable
//                .groupBy($("id"), $("ts"))
//                .aggregate($("vc").sum().as("vcSum"))
//                .select($("id"), $("ts"), $("vcSum"));

        //6.将数据通过连接器写入文件
        resultTable.executeInsert("sensor");

        //7.启动
        env.execute();

    }

}
